package bigdata.jobclean

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoders, Row, RowFactory, SaveMode, SparkSession}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apdplat.word.WordSegmenter

import scala.collection.mutable.ArrayBuffer
import scala.io.Source

class WordCount2 {

}
/*
统计各个职业的技能要求词频
所有职位的
 */
object WordCount2
{

//  所有停用词
  var stopwords = List[String]()
  //  加载自定义的不要的停用词
  def loadStopwords(): Unit =
  {
    for( line <- Source.fromInputStream(getClass.getResourceAsStream("/work_words.txt")).getLines())
    {
      stopwords = stopwords:+line
    }
  }
//  判断是不是停用词
  def isStopWord(word:String):Boolean={
    //x年这种
    if(word.endsWith("年"))true
//      数字，长度一这种也不要
    else if(word.length==1)true

    else if(stopwords.contains(word))true
    else false
  }
  //  分词
  def word_split(detail:String):Array[String]={

    if(detail==null)return Array[String]()

    val result  = WordSegmenter.seg(detail)
    val token = ArrayBuffer[String]()
//    结果遍历
    for(i <- 0 until result.size())
    {
      val word = result.get(i).getText
      //      过滤自定义停用词
      if(!isStopWord(word))
        token +=(word)
    }
    token.toArray[String]
  }


//  一开始时加载停用词
  loadStopwords()

  def main(args: Array[String]): Unit = {

    var clean_path = "result/cleaned/*";
    var out_dir = "result/cate_wc"

    if(args.length>0)
      {
        clean_path = args(0)
        out_dir = args(1)
        println(s"使用命令行参数: clean_path: ${clean_path} ,out_dir:${out_dir}")
      }
//    注册udf函数 ，用来将detail中文分词成数组
    val word_udf = udf(word_split _)

    println("加载自定义停用词:"+stopwords)

    val spark = SparkSession.builder()
      .master("local[2]").appName("jobClean").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    val schema = StructType(Array("url","name","salary","province","city","exp","edu","num","pubtime","cname","ctype","ctrade","cnum","cate1","cate2","welfare","detail")
      .map(col=>StructField(col,DataTypes.StringType)))
//    读取原始数据
    val data = spark.read.option("header","false").schema(schema).csv(clean_path)
//      .limit(100)
    /*    Java开发工程师
    Web前端开发
    数据分析师
    UI设计师
     */

//   detail列  展开为 cate1,cate2,word,count的行
    val words = data.select(col("cate1"),col("cate2") ,
     explode(  word_udf(col("detail")) ) .alias("word" )
    ).groupBy("cate1","cate2","word").agg(count("word").alias("count"))

//    组内取Top N，开窗。来过滤前100
    val window = Window.partitionBy("cate1","cate2").orderBy(desc( "count"))
    val result = words.withColumn("rank",row_number().over(window)).
      filter( col("rank")<=100)

    //打印列名
    println(result.columns.mkString(","))

//    保存结果
    result.write.option("header","true").mode(SaveMode.Overwrite).csv(out_dir)

    //    val temp = data.rdd.flatMap(row=>{
    //      val cate1 = row.getAs[String](0)
    //      val cate2 = row.getAs[String](1)
    //      var detail = row.getAs[String](2)
    //      val result = word_split(detail)
    //      result.map(word=>(cate1+"-"+cate2+"-"+word,1))
    ////Encoders.tuple(Encoders.STRING,Encoders.STRING,Encoders.STRING
    //    }).reduceByKey(_+_).map()
    //
    //    print(temp)



    //    val cate2 = data
    //      .filter(job=>job.getAs[String]("cate2").contains("网店模特"))
    //    println("finish")
    ////    广播到节点，不然要支持序列化
    //
    //    val words = cate2.rdd.flatMap(row=>{
    //      var detail = row.getAs[String]("detail")
    //      val result = word_split(detail)
    //      result
    //    })
    ////
    //
    //    //filter(f=>f.matches("^[A-Za-z]+$"))
    //    words.map(word=>(word,1)).reduceByKey(_+_).sortBy(f=>f._2,ascending=false).take(100)
    //      .foreach(println)

    spark.stop()
    System.exit(2)


  }
}
